Project: Traffic Sign Recognition Classifier using deep learning


The solution has been prepared in accordance to the rubrics defined here.


Step 1: Initialize the common libraries

In [68]:
SEED=5469
BASE_DIR='./traffic-signs-data'
SIGNNAMES_DIR = BASE_DIR + '/signnames/'
OUT_DIR ='./traffic-signs-classification'

# Standard libs
import pickle
import csv
from timeit import default_timer as timer
import os
import sys

#Visualisation
%matplotlib inline
from tqdm import tqdm_notebook

import matplotlib.pyplot as plt 
from IPython.display import Image
from IPython.display import display

# numerical libs 
import cv2
import math

import random
import numpy as np
random.seed(SEED)
np.random.seed(SEED)

import tensorflow as tf
tf.set_random_seed(SEED)

from tensorflow.python.training import moving_averages
from tensorflow.contrib.framework import add_model_variable
sess = tf.InteractiveSession()
progressbar_width = '500'

Step 2: Load The Data

The pickled data is a dictionary with 4 key/value pairs:

  • 'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).
  • 'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.
  • 'sizes' is a list containing tuples, (width, height) representing the original width and height the image.
  • 'coords' is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. These coordinates assume the original image. The pickled data conatins the resized versions (32x32) of these images.
In [69]:
# Load pickled dataset

def load_data(): 
    training_data  = BASE_DIR + '/train.p'
    testing_data   = BASE_DIR + '/test.p'
    classname_data = BASE_DIR + '/signnames.csv'

    classnames = []
    with open(classname_data) as _f:
        rows = csv.reader(_f, delimiter=',')
        next(rows, None)  # skip the headers
        for i, row in enumerate(rows):
            assert(i==int(row[0]))
            classnames.append(row[1])
 
    with open(training_data, mode='rb') as f:
        train = pickle.load(f)
    with open(testing_data, mode='rb') as f:
        test = pickle.load(f)

    X_train, y_train = train['features'], train['labels']
    X_test, y_test   = test['features'], test['labels']
    
    
    X_train  = X_train.astype(np.float32)
    y_train  = y_train.astype(np.int32)
    X_test   = X_test.astype(np.float32)
    y_test   = y_test.astype(np.int32)
    
    return  classnames, X_train, y_train, X_test, y_test

Step 3: Dataset Summary & Exploration

Here a basic summary of the data set is presented:

  • No of training examples: 34799
  • Number of testing examples: 12630
  • Image data shape: (32, 32, 3)
  • Number of classes: 43
In [70]:
### Replace each question mark with the appropriate value.

classnames, X_train, y_train, X_test, y_test = load_data() 
 
# Number of training examples 
num_train = len(X_train)

# Number of testing examples.
num_test = len(X_test)

# Shape of an traffic sign image
_, height, width, channel = X_train.shape
image_shape = (height, width, channel)

# Number of unique classes/labels in the dataset
num_class = len(np.unique(y_train))

num_total = num_train + num_test

print("Number of training examples =", num_train )
print("Number of testing examples =", num_test )
print("Image data shape =", image_shape)
print("Number of classes =", num_class)
Number of training examples = 34799
Number of testing examples = 12630
Image data shape = (32, 32, 3)
Number of classes = 43

Step 4: Exploratory visualization of the dataset

Here, the German Traffic Signs Dataset have been visualized using the pickled dataset available from the course. Below, the histrogram analysis of the different training and testing set images have been prepared for all the 43 classes of traffic signs present in the pickled dataset.

In [71]:
### Data exploration visualization goes here.
# Helper function to draw graph,etc  
def get_image_label(c): 
    img=cv2.imread(SIGNNAMES_DIR + str(c) + '.jpg',1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    label_image = cv2.resize(img, (32,32))
    return label_image

def insert_subimage(image, sub_image, y, x): 
    h, w, c = sub_image.shape
    image[y:y+h, x:x+w, :]=sub_image 
    return image


def display_dataset(images, labels, dataset_type):
    data_images, data_labels = images, labels
    #results image
    num_sample=15
    results_image = 255.*np.ones(shape=((num_class+2)*height,(num_sample+2+22)*width, channel),dtype=np.float32)
    cv2.putText(results_image, "Traffic signs class and label", (0, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(0,0,255),2)
    cv2.putText(results_image, "Traffic signs " + dataset_type + "ing dataset", (width+520, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(255,0,0),2)
    cv2.putText(results_image, "Class Frequency", ((2+num_sample+15)*width, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(0,0,255),2)
    cv2.line(results_image,(0,height+10),(results_image.shape[0],height+10),(0,0,127),1)

    for c in tqdm_notebook(range(num_class), desc="Loading " + dataset_type + "ing"):
        label_image = get_image_label(c)
        insert_subimage(results_image, label_image, (c+2)*height, 450)

        # Calculate the mean image parameters
        idx = list(np.where(data_labels== c)[0])
        mean_image = np.average(data_images[idx], axis=0)
        insert_subimage(results_image, mean_image, (c+2)*height, width+460)

        # Make random samples
        for n in range(num_sample):
            sample_image = data_images[np.random.choice(idx)]
            insert_subimage(results_image, sample_image, (c+2)*height, (2+n)*width+460)

        # Dataset summary
        count=len(idx)      
        percentage = float(count)/float(num_total)
        cv2.putText(results_image, '%02d:%-6s'%(c, classnames[c]), (0, int(((c+2)+0.7)*height)),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,0),1)
        cv2.putText(results_image, '[%4d]'%(count), ((2+num_sample+15)*width, int(((c+2)+0.7)*height)),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,200,100),1)
        cv2.rectangle(results_image,((2+num_sample+17)*width, (c+2)*height),((2+num_sample+17)*width + round(percentage*3000), ((c+2)+1)*height),(127*(c%2),0,255),-1)

    cv2.imwrite(BASE_DIR+'/data_' + dataset_type + '.jpg',cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB))

    plt.rcParams["figure.figsize"] = (25,25)
    plt.imshow(results_image.astype(np.uint8))
    plt.axis('off') 
    plt.show()
In [72]:
display_dataset (X_train, y_train, "train")

from time import sleep
sleep(5) # Time in seconds.
display_dataset (X_test, y_test, "test")



Step 5: Design and Test a Model Architecture

Here I focus on designing and implementing a deep learning model that learns to recognize traffic signs. Training and testing of the model is done on the German Traffic Sign Dataset.

A. Dataset augmentation and generating fake dataset

The training data is initially splittted into training and validation set using a random seed. Next,the dataset is expanded and augmented with fake data by different techniques:

  • Data flipping
  • Adding different brightness, contrast and saturation
  • Data pertubation by rotation

Also, it is made sure that each class get similar number of training data. It was also made sure that after flipping and other perturbation if the output of the original image belonged to some other class, then it was labelled accordingly.

In [73]:
# split into train and validation. 
def split_data(X_train, y_train, num_valid=3000): 
  
    num   = len(y_train)   # ~40000 
    index = list(range(num))
    random.shuffle(index)
    train_index=index[num_valid:]
    valid_index=index[:num_valid]


    train_images = X_train[train_index] 
    train_labels = y_train[train_index]
    valid_images = X_train[valid_index] 
    valid_labels = y_train[valid_index] 
   
    return  train_images, train_labels,  valid_images, valid_labels
In [74]:
#  This expands the train data by flipping.
#  Note: this code is from : http://navoshta.com/traffic-signs-classification/
def extend_data_by_flipping(images, labels):

    X=images
    y=labels

    # Classes of signs that, when flipped horizontally, should still be classified as the same class
    self_flippable_horizontally = np.array([11, 12, 13, 15, 17, 18, 22, 26, 30, 35])
    # Classes of signs that, when flipped vertically, should still be classified as the same class
    self_flippable_vertically = np.array([1, 5, 12, 15, 17])
    # Classes of signs that, when flipped horizontally and then vertically, should still be classified as the same class
    self_flippable_both = np.array([32, 40])
    # Classes of signs that, when flipped horizontally, would still be meaningful, but should be classified as some other class
    cross_flippable = np.array([
        [19, 20],
        [33, 34],
        [36, 37],
        [38, 39],
        [20, 19],
        [34, 33],
        [37, 36],
        [39, 38],
    ])
    num_classes = 43

    X_extended = np.empty([0, X.shape[1], X.shape[2], X.shape[3]], dtype=np.float32)
    y_extended = np.empty([0], dtype=np.int32)

    for c in tqdm_notebook(range(num_classes), desc="Flipping", ncols=progressbar_width):
        # First copy existing data for this class
        X_extended = np.append(X_extended, X[y == c], axis=0)
        # If we can flip images of this class horizontally and they would still belong to said class...
        if c in self_flippable_horizontally:
            # ...Copy their flipped versions into extended array.
            X_extended = np.append(X_extended, X[y == c][:, :, ::-1, :], axis=0)
        # If we can flip images of this class horizontally and they would belong to other class...
        if c in cross_flippable[:, 0]:
            # ...Copy flipped images of that other class to the extended array.
            flip_class = cross_flippable[cross_flippable[:, 0] == c][0][1]
            X_extended = np.append(X_extended, X[y == flip_class][:, :, ::-1, :], axis=0)
        # Fill labels for added images set to current class.
        y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))

        # If we can flip images of this class vertically and they would still belong to said class...
        if c in self_flippable_vertically:
            # ...Copy their flipped versions into extended array.
            X_extended = np.append(X_extended, X_extended[y_extended == c][:, ::-1, :, :], axis=0)
        # Fill labels for added images set to current class.
        y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))

        # If we can flip images of this class horizontally AND vertically and they would still belong to said class...
        if c in self_flippable_both:
            # ...Copy their flipped versions into extended array.
            X_extended = np.append(X_extended, X_extended[y_extended == c][:, ::-1, ::-1, :], axis=0)
        # Fill labels for added images set to current class.
        y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))

    extend_datas  = X_extended
    extend_labels = y_extended
    return (extend_datas, extend_labels)
In [75]:
# use opencv to do data agumentation

def perturb(image, keep, angle_limit=15, scale_limit=0.1, translate_limit=3, distort_limit=3, illumin_limit=0.7):

    u=np.random.uniform()
    if u>keep :
        (W, H, C) = image.shape
        center = np.array([W / 2., H / 2.])
        da = np.random.uniform(low=-1, high=1) * angle_limit/180. * math.pi
        scale = np.random.uniform(low=-1, high=1) * scale_limit + 1

        cc = scale*math.cos(da)
        ss = scale*math.sin(da)
        rotation    = np.array([[cc, ss],[-ss,cc]])
        translation = np.random.uniform(low=-1, high=1, size=(1,2)) * translate_limit
        distort     = np.random.standard_normal(size=(4,2)) * distort_limit

        pts1 = np.array([[0., 0.], [0., H], [W, H], [W, 0.]])
        pts2 = np.matmul(pts1-center, rotation) + center  + translation

        #add perspective noise
        pts2 = pts2 + distort

        matrix  = cv2.getPerspectiveTransform(pts1.astype(np.float32), pts2.astype(np.float32)) 
        perturb = cv2.warpPerspective(image, matrix, (W, H), flags=cv2.INTER_LINEAR,
                                      borderMode=cv2.BORDER_REFLECT_101)  # BORDER_WRAP  #BORDER_REFLECT_101  #cv2.BORDER_CONSTANT  BORDER_REPLICATE

        # Add brightness, contrast, saturation
        if 1:  #brightness
            alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
            perturb *= alpha
            perturb = np.clip(perturb,0.,255.)
            pass

        if 1:  #contrast
            coef = np.array([[[0.299, 0.587, 0.114]]]) #rgb to gray (YCbCr) :  Y = 0.299R + 0.587G + 0.114B

            alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
            gray = perturb * coef
            gray = (3.0 * (1.0 - alpha) / gray.size) * np.sum(gray)
            perturb *= alpha
            perturb += gray
            perturb = np.clip(perturb,0.,255.)
            pass

        if 1:  #saturation
            coef = np.array([[[0.299, 0.587, 0.114]]]) #rgb to gray (YCbCr) :  Y = 0.299R + 0.587G + 0.114B

            alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
            gray = perturb * coef
            gray = np.sum(gray, axis=2, keepdims=True)
            gray *= (1.0 - alpha)
            perturb *= alpha
            perturb += gray
            perturb = np.clip(perturb,0.,255.)
            pass

        return perturb

    else:
        return image
    
    
def make_perturb_images(images, keep ):
    arguments = np.zeros(images.shape)
    for n in tqdm_notebook(range(len(images)), desc="Perturbing", ncols=progressbar_width):
        arguments[n] = perturb(images[n],keep = keep)

    return arguments


# sample and shuffle the data such that each class has equal number of samples for training
def shuffle_data_uniform(datas, labels, num_class, num_per_class=None):

    if num_per_class is None:
        max_count = 0
        for c in range(num_class):
            idx = list(np.where(labels == c)[0])
            count = len(idx)
            max_count = max(count, max_count)
        num_per_class = max_count

    index = []
    for c in range(num_class):
        idx = list(np.where(labels == c)[0])
        index = index + list(np.random.choice(idx, num_per_class))

    random.shuffle(index)
    shuffle_datas  = datas[index]
    shuffle_labels = labels[index]

    return shuffle_datas, shuffle_labels

#generate next batch for sdg
def generate_train_batch_next(datas, labels, n, batch_size):
    i = n*batch_size
    batch_datas  = datas [i:i+batch_size]
    batch_labels = labels[i:i+batch_size]
    return batch_datas, batch_labels
In [76]:
#prepare all data here 
classnames, X_train, y_train, X_test, y_test = load_data() 

train_images, train_labels,  valid_images, valid_labels = split_data(X_train, y_train)
test_images, test_labels = X_test, y_test
 
num_train = len(train_images)
num_valid = len(valid_images)
num_test  = len(test_images)
    
print('** Dataset details **')
print('Height, width, channel = %d, %d, %d'%(height, width, channel))
print('Number of test set  = %d'%num_test)
print('Number of validation set = %d'%num_valid)
print('Number of training set = %d'%num_train)

#train data filpping.
train_images, train_labels = extend_data_by_flipping(train_images, train_labels) 
num_train_flip = len(train_images)
print('')
print('Number of training set(after flip)= %d' % num_train_flip )
 
#train data augmentation 
keep = 0.20   # 0.50   0.25 0.20   0.15
num_per_class = 1000
num_augmented = num_per_class*num_class
augmented_images, augmented_labels = shuffle_data_uniform(train_images, train_labels, num_class, num_per_class=num_per_class)
augmented_images = make_perturb_images(augmented_images, keep=keep) 
num_augmented = len(augmented_images) 
print('Number of augmented images = %d' % num_augmented)  
 

# Newly generated training data
print('\n')
print('Examples of augmented images (First column is the orginal image)')

# results image
num_sample = 20
perturbance_per_sample = 20

results_image = 255. * np.ones(shape=(num_sample * height, (perturbance_per_sample+1)* width+10, channel),dtype=np.float32)
for j in tqdm_notebook(range(num_sample), desc="Loading display", ncols=progressbar_width):
    i = random.randint(0, num_train_flip - 1)

    image = train_images[i]
    insert_subimage(results_image, image, j * height, 0)

    for k in range(0, perturbance_per_sample):
        perturb_image = perturb(image, keep=0)
        insert_subimage(results_image, perturb_image, j*height, (k+1)*width+10)

         
cv2.imwrite(BASE_DIR+'/data_augmented.jpg',cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB))
plt.rcParams["figure.figsize"] = (25,25)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off') 
plt.show()
** Dataset details **
Height, width, channel = 32, 32, 3
Number of test set  = 12630
Number of validation set = 3000
Number of training set = 31799


Number of training set(after flip)= 54674

Number of augmented images = 43000


Examples of augmented images (First column is the orginal image)

B. Model Architecture

The dense block from the paper "Densely Connected Convolutional Networks" - Gao Huang, Zhuang Liu, Kilian Q. Weinberger, Laurens van der Maaten, Arxiv 2016, was used.

Reuse and some modification(s):

  • The paper uses Batch normalization - relu - conv, which I used as the basic architecture for maximum information flow between layers in the network. All layers were connected (with matching feature-map sizes) directly with each other
  • The paper uses dropout inside the block but we shift the dropout outside the block see network construction later
In [10]:
### Define your architecture here.

##  global varaiables ##
IS_TRAIN_PHASE = tf.placeholder(dtype=tf.bool, name='is_train_phase')

def conv2d(x, num_kernels=1, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, name='conv'):

    input_shape = x.get_shape().as_list()
    assert len(input_shape)==4
    C = input_shape[3]
    H = kernel_size[0]
    W = kernel_size[1]
    K = num_kernels

    ##[filter_height, filter_width, in_channels, out_channels]
    w    = tf.get_variable(name=name+'_weight', shape=[H, W, C, K], initializer=tf.truncated_normal_initializer(stddev=0.1))
    conv = tf.nn.conv2d(x, w, strides=stride, padding=padding, name=name)
    if has_bias:
        b = tf.get_variable(name=name + '_bias', shape=[K], initializer=tf.constant_initializer(0.0))
        conv = conv+b

    return conv


def relu(x, name='relu'):
    act = tf.nn.relu(x, name=name)
    return act

def prelu(x, name='prelu'):
  alpha = tf.get_variable(name=name+'_alpha', shape=x.get_shape()[-1],
                        initializer=tf.random_uniform_initializer(minval=0.1, maxval=0.3),
                        dtype=tf.float32)
  pos = tf.nn.relu(x)
  neg = alpha * (x - abs(x)) * 0.5

  return pos + neg


# very leaky relu
#def vlrelu(x, alpha=0.25, name='vlrelu'): #  alpha between 0.1 to 0.5
#    act =tf.maximum(alpha*x,x)
#    return act

def maxpool(x, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, name='max' ):
    H = kernel_size[0]
    W = kernel_size[1]
    pool = tf.nn.max_pool(x, ksize=[1, H, W, 1], strides=stride, padding=padding, name=name)
    return pool

def avgpool(x, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, is_global_pool=False, name='avg'):

    if is_global_pool==True:
        input_shape = x.get_shape().as_list()
        assert len(input_shape) == 4
        H = input_shape[1]
        W = input_shape[2]

        pool = tf.nn.avg_pool(x, ksize=[1, H, W, 1], strides=[1,H,W,1], padding='VALID', name=name)
        pool = flatten(pool)

    else:
        H = kernel_size[0]
        W = kernel_size[1]
        pool = tf.nn.avg_pool(x, ksize=[1, H, W, 1], strides=stride, padding=padding, name=name)

    return pool


def dropout(x, keep=1.0, name='drop'):
    #drop = tf.cond(IS_TRAIN_PHASE, lambda: tf.nn.dropout(input, keep), lambda: x)
    drop = tf.cond(IS_TRAIN_PHASE,
                   lambda: tf.nn.dropout(x, keep),
                   lambda: tf.nn.dropout(x, 1))
    return drop


def flatten(x, name='flat'):
    input_shape = x.get_shape().as_list()        # list: [None, 9, 2]
    dim   = np.prod(input_shape[1:])                 # dim = prod(9,2) = 18
    flat  = tf.reshape(x, [-1, dim], name=name)  # -1 means "all"
    return flat

def concat(x, name='cat'):
    cat = tf.concat(concat_dim=3, values=x, name=name)
    return cat


def bn (x, decay=0.9, eps=1e-5, name='bn'):
    with tf.variable_scope(name) as scope:
        bn = tf.cond(IS_TRAIN_PHASE,
            lambda: tf.contrib.layers.batch_norm(x,  decay=decay, epsilon=eps, center=True, scale=True,
                              is_training=1,reuse=None,
                              updates_collections=None, scope=scope),
            lambda: tf.contrib.layers.batch_norm(x, decay=decay, epsilon=eps, center=True, scale=True,
                              is_training=0, reuse=True,
                              updates_collections=None, scope=scope))

    return bn
In [11]:
# basic building blocks

def bn_relu_conv2d (x, num_kernels=1, kernel_size=(1, 1), stride=[1, 1, 1, 1], padding='SAME', name='conv'):
    with tf.variable_scope(name) as scope:
        block = bn(x)
        block = relu(block)
        block = conv2d(block, num_kernels=num_kernels, kernel_size=kernel_size, stride=stride, padding=padding, has_bias=False)
    return block


def dense_block_cbr (x, num=1, num_kernels=1, kernel_size=(1, 1), drop=None, name='DENSE'):
 
    block = x
    for n in  range(num):
        with tf.variable_scope(name+'_%d'%n) as scope:
            conv = conv2d(block, num_kernels=num_kernels, kernel_size=kernel_size, stride=[1,1,1,1], padding='SAME', has_bias=False)
            conv = bn(conv)
            conv = relu(conv)

            if drop is not None:
                keep = (1 - drop) ** (1. / num)
                conv = dropout(conv, keep=keep)

            block = concat((block, conv))
    return block
In [12]:
# the loss 
def l2_regulariser(decay):

    variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
    for v in variables:
        name = v.name
        if 'weight' in name:  #this is weight
            l2 = decay * tf.nn.l2_loss(v)
            tf.add_to_collection('losses', l2)
        elif 'bias' in name:  #this is bias
            pass
        elif 'beta' in name:
            pass
        elif 'gamma' in name:
            pass
        elif 'moving_mean' in name:
            pass
        elif 'moving_variance' in name:
            pass
        elif 'moments' in name:
            pass

        else:
            #pass
            #raise Exception('unknown variable type: %s ?'%name)
            pass

    l2_loss = tf.add_n(tf.get_collection('losses'))
    return l2_loss


def cross_entropy(logit, label, name='cross_entropy'):
    label = tf.cast(label, tf.int64)
    cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logit, labels=label), name=name)
    return cross_entropy


def accuracy(prob, label, name='accuracy'):
    correct_prediction = tf.equal(tf.argmax(prob, 1), tf.cast(label, tf.int64))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name=name)
    return accuracy
In [13]:
# The densenet network with the inference part (without loss)

def DenseNet( input_shape=(1,1,1), output_shape = (1)):
    H, W, C   = input_shape
    num_class = output_shape
    x     = tf.placeholder(shape=[None, H, W, C], dtype=tf.float32, name='x')

    #color preprocessing using conv net:
    #see "Systematic evaluation of CNN advances on the ImageNet"-Dmytro Mishkin, Nikolay Sergievskiy, Jiri Matas, ARXIV 2016
    # https://arxiv.org/abs/1606.02228
    # we use learnable prelu (different from paper) and 3x3 onv
    with tf.variable_scope('preprocess') as scope:
        x = bn(x, name='b1')
        x = conv2d(x, num_kernels=8, kernel_size=(3, 3), stride=[1, 1, 1, 1], padding='SAME', has_bias=True, name='c1')
        x = prelu(x, name='r1')
        x = conv2d(x, num_kernels=8, kernel_size=(1, 1), stride=[1, 1, 1, 1], padding='SAME', has_bias=True, name='c2')
        x = prelu(x, name='r2')

    with tf.variable_scope('block1') as scope:
        block1 = bn_relu_conv2d(x, num_kernels=32, kernel_size=(5, 5), stride=[1, 1, 1, 1], padding='SAME')
        block1 = maxpool(block1, kernel_size=(2,2), stride=[1, 2, 2, 1], padding='SAME')

    # dropout is taken out of the block
    with tf.variable_scope('block2') as scope:
        block2 = dense_block_cbr(block1, num=4, num_kernels=16, kernel_size=(3, 3), drop=None)
        block2 = maxpool(block2, kernel_size=(2, 2), stride=[1, 2, 2, 1], padding='SAME')

    with tf.variable_scope('block3') as scope:
        block3 = dense_block_cbr(block2, num=4, num_kernels=24, kernel_size=(3, 3), drop=None)
        block3 = dropout(block3, keep=0.9)
        block3 = maxpool(block3,  kernel_size=(2,2), stride=[1, 2, 2, 1], padding='SAME')

    with tf.variable_scope('block4') as scope:
        block4 = dense_block_cbr(block3, num=4, num_kernels=32, kernel_size=(3, 3), drop=None)
        block4 = bn_relu_conv2d(block4, num_kernels=num_class, kernel_size=(1,1), stride=[1, 1, 1, 1], padding='SAME')
        block4 = dropout(block4, keep=0.8)
        block4 = avgpool(block4, is_global_pool=True)

    return block1, block2, block3, block4
In [14]:
# construct the graph here
block1, block2, block3, logit  = DenseNet(input_shape =(height, width, channel), output_shape=(num_class))
data   = tf.get_default_graph().get_tensor_by_name('x:0')
label  = tf.placeholder(dtype=tf.int32, shape=[None])
prob   = tf.nn.softmax(logit)

l2     = l2_regulariser(decay=0.0005)
loss   = cross_entropy(logit, label)
metric = accuracy(prob, label)

C. Train, Validate and Test the Model

A validation set can be used to assess how well the model is performing. A low accuracy on the training and validation sets imply underfitting. A high accuracy on the training set but low accuracy on the validation set implies overfitting.

In [15]:
### Train your model here.
### Feel free to use as many code cells as needed.

# changing of learning rate
def schdule_by_step( r, steps=(0,100), items=(0.1,0.01)):

    item = items[0]
    N=len(steps)
    for n in range(N):
        if r >= steps[n]:
            item = items[n]
    return item


#for testing and validation
def test_net( datas, labels, batch_size, data, label, loss, metric, sess):

    num = len(datas)
    all_loss = 0
    all_acc = 0
    all = 0
    for n in range(0, num, batch_size):
        #print('\r  evaluating .... %d/%d' % (n, num), end='', flush=True)
        start = n
        end = start+batch_size if start+batch_size<=num else num
        batch_datas  = datas  [start:end]
        batch_labels = labels [start:end]

        fd = {data: batch_datas, label: batch_labels, IS_TRAIN_PHASE : False}
        test_loss, test_acc = sess.run([loss, metric], feed_dict=fd)

        a = end-start
        all += a
        all_loss += a*test_loss
        all_acc  += a*test_acc

    assert(all==num)
    loss = all_loss/all
    acc  = all_acc/all

    return loss, acc
In [17]:
#solver 
epoch_log  = 2 
max_run    = 8
batch_size = 128 #128  #256  384  #128
steps = (0, 3, 6, 8)  
rates = (0.1, 0.01,  0.001, 0.0001) 

learning_rate = tf.placeholder(tf.float32, shape=[])
solver = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
solver_step = solver.minimize(loss+l2)

# start training here ------------------------------------------------
print ('start training')
sess.run(tf.global_variables_initializer(), feed_dict = {IS_TRAIN_PHASE : True } )
saver  = tf.train.Saver()
writer = tf.summary.FileWriter(OUT_DIR + '/tf', graph=tf.get_default_graph())


# keep a log   
print('')
print(' run  epoch   iter    rate      |  train_loss    (acc)     |  valid_loss    (acc)     |  time ')
print('----------------------------------------------------------------------------------------------')

tic = timer()
iter = 0
for r in range(max_run):
    rate = schdule_by_step(r, steps=steps, items=rates)

    argument_images, argument_labels = shuffle_data_uniform(train_images, train_labels, num_class,  num_per_class=num_per_class)
    argument_images = make_perturb_images(argument_images, keep=keep)


    num_argument = len(argument_images)
    N = max(num_argument//batch_size-1,1)
    #iter_log = round(float(num_train) / float(num_argument) * float(N))
    iter_log = max(round(float( epoch_log *num_train ) / float(batch_size)),1)
    for n in tqdm_notebook(range(N), desc="Training"):
        iter  = iter + 1
        run   = r + float(n)/float(N)
        epoch = float(iter*batch_size)/float(num_train)

        batch_datas, batch_labels = generate_train_batch_next( argument_images, argument_labels, n, batch_size )

        fd = {data: batch_datas, label: batch_labels, learning_rate: rate, IS_TRAIN_PHASE : True }
        _, batch_loss, batch_acc, = sess.run([solver_step, loss, metric ],feed_dict=fd)

      
        print('\r%4.1f  %5.1f   %05d   %f |  %f    (%f)  ' %
                  (run, epoch, iter, rate, batch_loss, batch_acc), end='', flush=True)

        #do validation here!
        if iter%iter_log==0 or (r==max_run-1 and n==N-1): 
            toc = timer()
            sec_pass = toc - tic
            min_pass = sec_pass/60.

            #validation
            val_loss, val_acc =  test_net(valid_images, valid_labels, batch_size, data, label, loss, metric, sess)

            #print('\r')
            print('\r%4.1f  %5.1f   %05d   %f |  %f    (%f) |  %f    (%f) | %4.1f min' %
                  (run, epoch, iter, rate, batch_loss, batch_acc, val_loss, val_acc, min_pass ), end='\n',flush=True)


        pass

    # save intermediate checkpoint
    # saver.save(sess, out_dir + '/check_points/%06d.ckpt'%r)  #iter


#final test! ------------------------------------------
# save final checkpoint
os.makedirs (OUT_DIR + '/check_points/final.ckpt',exist_ok=True)
saver.save(sess, OUT_DIR + '/check_points/final.ckpt')

print('\n') 
print('** evaluation on test set **' )
test_loss, test_acc = test_net(test_images, test_labels, batch_size, data, label, loss, metric, sess)
print('test_loss=%f    (test_acc=%f)' % ( test_loss, test_acc))
start training

 run  epoch   iter    rate      |  train_loss    (acc)     |  valid_loss    (acc)     |  time 
----------------------------------------------------------------------------------------------

 1.0    1.3   00334   0.100000 |  1.131707    (0.617188)  

 1.5    2.0   00497   0.100000 |  0.726358    (0.773438) |  0.813372    (0.724333) |  1.5 min
 2.0    2.7   00668   0.100000 |  0.474762    (0.812500)  

 3.0    4.0   00994   0.100000 |  0.123428    (0.976562) |  0.168394    (0.945000) |  2.9 min
 3.0    4.0   01002   0.100000 |  0.180594    (0.960938)  

 4.0    5.4   01336   0.010000 |  0.137669    (0.945312)  

 4.5    6.0   01491   0.010000 |  0.116610    (0.960938) |  0.016531    (0.995333) |  4.4 min
 5.0    6.7   01670   0.010000 |  0.076523    (0.976562)  

 5.9    8.0   01988   0.010000 |  0.065472    (0.976562) |  0.012188    (0.997333) |  5.7 min
 6.0    8.1   02004   0.010000 |  0.045050    (0.992188)  

 7.0    9.4   02338   0.001000 |  0.064316    (0.984375)  

 7.4   10.0   02485   0.001000 |  0.056892    (0.984375) |  0.009971    (0.998000) |  7.3 min
 8.0   10.8   02672   0.001000 |  0.056222    (0.992188) |  0.010780    (0.998000) |  7.7 min



** evaluation on test set **
test_loss=0.042439    (test_acc=0.987648)

D. Test a Model on New Images

To give yourself more insight into how your model is working, download at least five pictures of German traffic signs from the web and use your model to predict the traffic sign type.

You may find signnames.csv useful as it contains mappings from the class id (integer) to the actual sign name.

E. Load and Output the Images

In [61]:
test_files=['0002.jpg',   #normal
            '0000.jpg',   #normal
            '0004.jpg',   #occluded with snow
            '0006.jpg',   #small
            '0001.jpg',   #not in class
            ]
test_rois =[(190,135,405,330),(170,430,207,469),(1120,520,1650,1290),(226,65,242,78 ),(1370,280,1460,400)]
test_label=[17,38,14,40,25]

num=len(test_files)

# crop roi to 32x32 
results_image  = 255. * np.ones(shape=(1 * height, num* width, channel),dtype=np.float32) 
results_image1 = 255. * np.ones(shape=(1 * 320, num* 320, channel),dtype=np.float32)  
crops = np.zeros(shape=(num,height,width,channel),dtype=np.float32)
for n in range(num):
    img = cv2.imread(BASE_DIR+'/extra/' + test_files[n], 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) 
    x1, y1, x2, y2 = test_rois[n] 
    crop = cv2.resize(img[y1:y2, x1:x2, :], (0, 0), fx=32. / (x2 - x1), fy=32. / (y2 - y1),
                      interpolation=cv2.INTER_CUBIC)
    
    crop = np.clip(crop,0,255)
    crops[n]=crop
    insert_subimage(results_image, crop, 0, n*width)

    
    #mak roi and show
    H,W,C=img.shape
    S=max(H,W)
    f=320./S
    norm_img = cv2.resize(img, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_CUBIC)  
    cv2.rectangle(norm_img, (round(f*x1), round(f*y1)), (round(f*x2), round(f*y2)), (255,255,0), 3)
    insert_subimage(results_image1, norm_img, 0, n*320)     
    #cv2.imshow('crop', crop)
    #cv2.imshow('img', img)
    #cv2.waitKey(0)

cv2.imwrite(OUT_DIR+'/extra_crops.jpg',  cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB) )
cv2.imwrite(OUT_DIR+'/extra_marked.jpg', cv2.cvtColor(results_image1, cv2.COLOR_BGR2RGB) )
plt.rcParams["figure.figsize"] = (25,25)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off') 
plt.show()

plt.imshow(results_image1.astype(np.uint8))
plt.axis('off') 
plt.show()

F. Predict the Sign Type for Each Image

In [62]:
new_images = crops

# load trained classifier
saver  = tf.train.Saver()
saver.restore(sess, OUT_DIR + '/check_points/final.ckpt')

print('** test on extra **')
fd = {data: new_images, IS_TRAIN_PHASE: False}
test_prob = sess.run(prob, feed_dict=fd)

print('see printout of results in the next jupyter cell!')       
print('success')
** test on extra **
see printout of results in the next jupyter cell!
success

G. Analyze Performance

In [63]:
### Calculate the accuracy for these 5 new images. 
### For example, if the model predicted 1 out of 5 signs correctly, it's 20% accurate on these new images.
### Visualize the softmax probabilities here.

#show results 
f=10
results_image = 255. * np.ones(shape=(5*(f*height + f*8), 6*f*width, channel), dtype=np.float32)

for n in range(num):
    crop = crops[n] 
    c_hat=test_label[n]
    c_hat_label = classnames[c_hat] if c_hat>=0 else 'NIL'
    print('n=%d: true = %02d:%s' % (n, c_hat,c_hat_label))
    
    #crop = cv2.resize(crop, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_NN)
    crop = crop.repeat(f, axis=0).repeat(f, axis=1)
    insert_subimage(results_image, crop, n * (f*height + f*8), 0) 
    cv2.putText(results_image, '%02d:%s%s' % (c_hat,c_hat_label[0:15], '...' if len(classnames[c_hat])>15 else ''), 
                (5, (n+1) * (f*height + f*8)-50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
     

    p = test_prob[n]
    idx = np.argsort(p)[::-1]
    for k in range(5):
        c = int(idx[k])
        label_image = get_image_label(c)
        #label_image = cv2.resize(label_image, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_NN)
        label_image = label_image.repeat(f, axis=0).repeat(f, axis=1)
        insert_subimage(results_image, label_image, n * (f*height + f*8), (k + 1) * f*width)


        print('\ttop%d: %f  %02d:%s' % (k, p[c], c, classnames[c]))
        cv2.putText(results_image, '%02d:%s%s' % (c, classnames[c][0:15], '...' if len(classnames[c])>15 else ''), 
                    (5+(k + 1) * f*width, (n+1) * (f*height + f*8)-50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
        cv2.putText(results_image, 'top%d: %f' % (k, p[c]), 
                    (5+(k + 1) * f*width, (n+1) * (f*height + f*8)-20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
    print('')
       
print('')
print('')    
print('**visual results**:  X, followed by top-5')
cv2.imwrite(OUT_DIR+'/extra_predictions.jpg', cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB) )
plt.rcParams["figure.figsize"] = (30,30)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off') 
plt.show()
n=0: true = 17:No entry
	top0: 0.997443  17:No entry
	top1: 0.002229  14:Stop
	top2: 0.000063  19:Dangerous curve to the left
	top3: 0.000060  13:Yield
	top4: 0.000041  31:Wild animals crossing

n=1: true = 38:Keep right
	top0: 0.995767  38:Keep right
	top1: 0.001195  40:Roundabout mandatory
	top2: 0.001141  34:Turn left ahead
	top3: 0.001133  23:Slippery road
	top4: 0.000443  39:Keep left

n=2: true = 14:Stop
	top0: 0.999441  14:Stop
	top1: 0.000125  00:Speed limit (20km/h)
	top2: 0.000074  04:Speed limit (70km/h)
	top3: 0.000070  02:Speed limit (50km/h)
	top4: 0.000059  29:Bicycles crossing

n=3: true = 40:Roundabout mandatory
	top0: 0.898978  40:Roundabout mandatory
	top1: 0.040979  37:Go straight or left
	top2: 0.034417  39:Keep left
	top3: 0.009072  38:Keep right
	top4: 0.003928  34:Turn left ahead

n=4: true = 25:Road work
	top0: 0.632426  39:Keep left
	top1: 0.126607  25:Road work
	top2: 0.060975  38:Keep right
	top3: 0.025718  26:Traffic signals
	top4: 0.018464  31:Wild animals crossing



**visual results**:  X, followed by top-5

H. Visualize the Neural Network's State with Test Images

This Section is not required to complete but acts as an additional excersise for understaning the output of a neural network's weights. While neural networks can be a great learning device they are often referred to as a black box. We can understand what the weights of a neural network look like better by plotting their feature maps. After successfully training the neural network we can see what it's feature maps look like by plotting the output of the network's weight layers in response to a test stimuli image. From these plotted feature maps, it's possible to see what characteristics of an image the network finds interesting. For a sign, maybe the inner network feature maps react with high activation to the sign's boundary outline or to the contrast in the sign's painted symbol.

In [92]:
from tensorflow.python.tools.inspect_checkpoint import print_tensors_in_checkpoint_file
### Visualize feature maps based on the activations functions
# image_input: the test image being fed into the network to produce the feature maps
# tf_activation: should be a tf variable name used during your training procedure that represents the calculated state of a specific weight layer
# activation_min/max: can be used to view the activation contrast in more detail, by default matplot sets min and max to the actual min and max values of the output
# plt_num: used to plot out multiple different weight feature map sets on the same block, just extend the plt number for each new feature map entry
def outputFeatureMap(image_input, tf_activation, activation_min=-1, activation_max=-1, plt_num=1):
    # Here make sure to preprocess your image_input in a way your network expects
    # with size, normalization, ect if needed
    # image_input =
    # Note: x should be the same name as your network's tensorflow data placeholder variable
    # If you get an error tf_activation is not defined it may be having trouble accessing the variable from inside a function
    activation = tf_activation.eval(session=sess,feed_dict={data : image_input, IS_TRAIN_PHASE: 0})
    featuremaps = activation.shape[3]
    fig = plt.figure(plt_num, figsize=(15,8*(featuremaps//32)))
    for featuremap in tqdm_notebook(range(featuremaps), desc = 'Preparing viuslaizer'):
        plt.subplot(math.ceil(featuremaps/8),8, featuremap+1) # sets the number of feature maps to show on each row and column
         # displays the feature map number
        plt.title('FeatureMap ' + str(featuremap))
        if activation_min != -1 & activation_max != -1:
            plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin =activation_min, vmax=activation_max, cmap="gray")
        elif activation_max != -1:
            plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmax=activation_max, cmap="gray")
        elif activation_min !=-1:
            plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin=activation_min, cmap="gray")
        else:
            plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", cmap="gray")
        plt.suptitle('Convolution activation layer ' + str(plt_num), size=16)
        plt.savefig(OUT_DIR+'/visualize_image_CNN_'+str(plt_num)+'.png', bbox_inches="tight")
            
with tf.Session() as sess:
    saver.restore(sess, OUT_DIR + '/check_points/final.ckpt')
    ### Feature map highlighter
    ix = int(np.random.random() * X_test.shape[0])
    random_image = np.expand_dims(X_test[ix], axis=0)
    plt.figure(figsize=(3,3))
    plt.imshow(X_test[ix])
    plt.show()
    resized_image = cv2.resize(X_test[ix], (50, 50)) 
    cv2.imwrite(OUT_DIR+'/visualize_image.png', cv2.cvtColor(X_test[ix], cv2.COLOR_BGR2RGB))
    outputFeatureMap(random_image, block1, plt_num=1)
    #outputFeatureMap(random_image, block2, plt_num=2)

In [ ]: